/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.engine.impl;

import com.chinamobile.cmss.lakehouse.common.Constants;
import com.chinamobile.cmss.lakehouse.common.dto.engine.KubernetesCommonTaskReq;
import com.chinamobile.cmss.lakehouse.common.dto.engine.LakehouseResponse;
import com.chinamobile.cmss.lakehouse.common.dto.engine.LocalSQLTaskReq;
import com.chinamobile.cmss.lakehouse.common.dto.engine.SQLTaskKillReq;
import com.chinamobile.cmss.lakehouse.common.enums.HttpStatus;
import com.chinamobile.cmss.lakehouse.common.enums.TaskType;
import com.chinamobile.cmss.lakehouse.common.utils.EngineUtils;
import com.chinamobile.cmss.lakehouse.common.utils.PropertyUtils;
import com.chinamobile.cmss.lakehouse.engine.spark.SparkCommandBuilder;
import com.chinamobile.cmss.lakehouse.service.engine.EngineService;
import com.chinamobile.cmss.lakehouse.service.engine.SparkEngine;

import java.util.HashMap;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class SparkKubernetesEngineServiceImpl extends SparkEngine implements EngineService {

    private final String sqlMainClass = PropertyUtils.getString(Constants.ENGINE_SQL_APP_MAIN_CLASS);
    private final String sqlAppSource = PropertyUtils.getString(Constants.ENGINE_SQL_APP_RESOURCE);

    @Override
    public String submitSQLTaskLocal(LocalSQLTaskReq req) {
        SparkCommandBuilder builder = new SparkCommandBuilder();
        String appName = EngineUtils.generateAppName("spark-sql-local");
        builder.appName(appName).master(req.getMaster()).setMainClass(sqlMainClass)
            .setAppResource(sqlAppSource);

        // change db
        if (!"default".equals(req.getDefaultDatabase())) {
            builder.args(req.getSqlContext(), req.getDefaultDatabase());
        } else {
            builder.args(req.getSqlContext());
        }
        if (req.getConfig() == null) {
            req.setConfig(new HashMap<>());
        }
        // hive uris
        if (StringUtils.isNotEmpty(req.getMetastoreUris())) {
            req.getConfig().put("spark.hive.metastore.urishive.metastore.uris", req.getMetastoreUris());
        }
        if (req.getConfig() != null) {
            builder.setConf(req.getConfig());
        }
        // submit task
        // submit(builder);

        return appName;
    }

    @Override
    public LakehouseResponse submitSQLTaskOnK8s(KubernetesCommonTaskReq req) {
        String master = req.getMaster();
        if (!isK8s(master)) {
            // 400
            return new LakehouseResponse(HttpStatus.BAD_REQUEST, "Master can not be null!");
        }
        if (StringUtils.isEmpty(req.getImage())) {
            // 400
            return new LakehouseResponse(HttpStatus.BAD_REQUEST, "Kubernetes image can not be null!");
        }
        if (StringUtils.isEmpty(req.getDriverPodName())) {
            // 400
            return new LakehouseResponse(HttpStatus.BAD_REQUEST, "Kubernetes driver pod name can not be null!");
        }
        String appName = req.getDriverPodName();
        transReq2Conf(req);

        SparkCommandBuilder builder = new SparkCommandBuilder();
        String mainClass = null == req.getMainClass() ? sqlMainClass : req.getMainClass();
        String jarPath = null == req.getJarPath() ? sqlAppSource : req.getJarPath();
        builder.appName(appName).master(req.getMaster()).setDeployMode(req.getDeployMode())
            .setMainClass(mainClass).setAppResource(jarPath);
        if (StringUtils.isNotEmpty(req.getProxyUser())) {
            builder.setProxyUser(req.getProxyUser());
        }

        // console and not default, change db
        if (TaskType.CONSOLE.equals(req.getType()) && !"default".equals(req.getDefaultDatabase())) {
            builder.setConf("spark.sql.default.database", req.getDefaultDatabase());
        }

        if (req.getEngineArgs() != null) {
            builder.args(req.getEngineArgs());
        }

        if (req.getConfig() != null) {
            builder.setConf(req.getConfig());
        }
        // submit task
        String logPath = submit(builder);

        return LakehouseResponse.success(logPath);
    }

    @Override
    public LakehouseResponse killSQLTaskOnK8s(SQLTaskKillReq req) {
        String master = req.getMaster();
        if (!isK8s(master)) {
            // 400
            return new LakehouseResponse(HttpStatus.BAD_REQUEST, "Master can not be null!");
        }
        SparkCommandBuilder builder = new SparkCommandBuilder();
        builder.master(master);

        String driverPodId = req.getDriverPodId();
        if (StringUtils.isNotEmpty(req.getNamespace())) {
            driverPodId = String.format("%s:%s", req.getNamespace(), driverPodId);
        }
        kill(builder, driverPodId);
        return LakehouseResponse.success(true);
    }

    private void transReq2Conf(KubernetesCommonTaskReq req) {
        Map<String, String> sparkConf = new HashMap<>();
        // resources
        sparkConf.put("spark.driver.memory", req.getDriverMemory());
        sparkConf.put("spark.executor.memory", req.getExecutorMemory());
        sparkConf.put("spark.executor.cores", String.valueOf(req.getExecutorCores()));
        sparkConf.put("spark.executor.instances", String.valueOf(req.getInstances()));
        sparkConf.put("spark.driver.cores", String.valueOf(req.getDriverCores()));

        // hive uris
        if (StringUtils.isNotEmpty(req.getMetastoreUris())) {
            sparkConf.put("spark.hive.metastore.uris", req.getMetastoreUris());
        }

        // k8s namespace
        if (StringUtils.isNotEmpty(req.getNamespace())) {
            sparkConf.put("spark.kubernetes.namespace", req.getNamespace());
        }

        if (StringUtils.isNotEmpty(req.getServiceAccountName())) {
            sparkConf.put("spark.kubernetes.authenticate.driver.serviceAccountName",
                req.getServiceAccountName());
        }

        if (StringUtils.isNotEmpty(req.getPullPolicy())) {
            sparkConf.put("spark.kubernetes.container.image.pullPolicy", req.getPullPolicy());
        }

        sparkConf.put("spark.kubernetes.container.image", req.getImage());
        sparkConf.put("spark.kubernetes.driver.pod.name", req.getDriverPodName());

        if (StringUtils.isNotEmpty(req.getUploadPath())) {
            sparkConf.put("spark.kubernetes.file.upload.path", req.getUploadPath());
        }

        if (req.getConfig() == null) {
            req.setConfig(new HashMap<>(sparkConf));
        } else {
            req.getConfig().putAll(sparkConf);
        }
    }
}
